package com.kafka.cn.lucubrate.consumer;

import com.alibaba.fastjson.JSONObject;
import org.springframework.kafka.annotation.KafkaHandler;
import org.springframework.kafka.annotation.KafkaListener;
import org.springframework.kafka.support.Acknowledgment;
import org.springframework.stereotype.Controller;
import org.springframework.web.bind.annotation.RequestMapping;

@Controller
@RequestMapping("/send-consumer")
public class ConsumerController {


    /**
     * 订阅主题消息，当该主题有新的消息时，会自动执行
     * @param data 生产者的数据
     * @param ack 提交偏移量offset
     */
    @KafkaListener(topics = {"first"}, groupId = "bran")
    @KafkaHandler
    public void user(String data, Acknowledgment ack){
        JSONObject jsonObject = JSONObject.parseObject(data);
        Object id = jsonObject.get("id");
        Object name = jsonObject.get("name");
        Object age = jsonObject.get("age");
        System.out.println(id.toString() + " - " + name.toString() + " - " + age.toString());
        //拿到数据，业务逻辑处理

        //提交offset
        ack.acknowledge();
    }
}
